#ifndef XRPL_SERVER_BASEHTTPPEER_H_INCLUDED
#define XRPL_SERVER_BASEHTTPPEER_H_INCLUDED

#include <xrpl/basics/Log.h>
#include <xrpl/beast/net/IPAddressConversion.h>
#include <xrpl/beast/utility/instrumentation.h>
#include <xrpl/server/Session.h>
#include <xrpl/server/detail/Spawn.h>
#include <xrpl/server/detail/io_list.h>

#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/streambuf.hpp>
#include <boost/beast/core/stream_traits.hpp>
#include <boost/beast/http/dynamic_body.hpp>
#include <boost/beast/http/message.hpp>
#include <boost/beast/http/parser.hpp>
#include <boost/beast/http/read.hpp>

#include <atomic>
#include <chrono>
#include <functional>
#include <memory>
#include <mutex>
#include <vector>

namespace ripple {

/** Represents an active connection. */
template <class Handler, class Impl>
class BaseHTTPPeer : public io_list::work, public Session
{
protected:
    using clock_type = std::chrono::system_clock;
    using error_code = boost::system::error_code;
    using endpoint_type = boost::asio::ip::tcp::endpoint;
    using yield_context = boost::asio::yield_context;

    enum {
        // Size of our read/write buffer
        bufferSize = 4 * 1024,

        // Max seconds without completing a message
        timeoutSeconds = 30,
        timeoutSecondsLocal = 3  // used for localhost clients
    };

    struct buffer
    {
        buffer(void const* ptr, std::size_t len)
            : data(new char[len]), bytes(len), used(0)
        {
            memcpy(data.get(), ptr, len);
        }

        std::unique_ptr<char[]> data;
        std::size_t bytes;
        std::size_t used;
    };

    Port const& port_;
    Handler& handler_;
    boost::asio::executor_work_guard<boost::asio::executor> work_;
    boost::asio::strand<boost::asio::executor> strand_;
    endpoint_type remote_address_;
    beast::Journal const journal_;

    std::string id_;
    std::size_t nid_;

    boost::asio::streambuf read_buf_;
    http_request_type message_;
    std::vector<buffer> wq_;
    std::vector<buffer> wq2_;
    std::mutex mutex_;
    bool graceful_ = false;
    bool complete_ = false;
    boost::system::error_code ec_;

    int request_count_ = 0;
    std::size_t bytes_in_ = 0;
    std::size_t bytes_out_ = 0;

    //--------------------------------------------------------------------------

public:
    template <class ConstBufferSequence>
    BaseHTTPPeer(
        Port const& port,
        Handler& handler,
        boost::asio::executor const& executor,
        beast::Journal journal,
        endpoint_type remote_address,
        ConstBufferSequence const& buffers);

    virtual ~BaseHTTPPeer();

    Session&
    session()
    {
        return *this;
    }

    void
    close() override;

protected:
    Impl&
    impl()
    {
        return *static_cast<Impl*>(this);
    }

    void
    fail(error_code ec, char const* what);

    void
    start_timer();

    void
    cancel_timer();

    void
    on_timer();

    void
    do_read(yield_context do_yield);

    void
    on_write(error_code const& ec, std::size_t bytes_transferred);

    void
    do_writer(
        std::shared_ptr<Writer> const& writer,
        bool keep_alive,
        yield_context do_yield);

    virtual void
    do_request() = 0;

    virtual void
    do_close() = 0;

    // Session

    beast::Journal
    journal() override
    {
        return journal_;
    }

    Port const&
    port() override
    {
        return port_;
    }

    beast::IP::Endpoint
    remoteAddress() override
    {
        return beast::IPAddressConversion::from_asio(remote_address_);
    }

    http_request_type&
    request() override
    {
        return message_;
    }

    void
    write(void const* buffer, std::size_t bytes) override;

    void
    write(std::shared_ptr<Writer> const& writer, bool keep_alive) override;

    std::shared_ptr<Session>
    detach() override;

    void
    complete() override;

    void
    close(bool graceful) override;
};

//------------------------------------------------------------------------------

template <class Handler, class Impl>
template <class ConstBufferSequence>
BaseHTTPPeer<Handler, Impl>::BaseHTTPPeer(
    Port const& port,
    Handler& handler,
    boost::asio::executor const& executor,
    beast::Journal journal,
    endpoint_type remote_address,
    ConstBufferSequence const& buffers)
    : port_(port)
    , handler_(handler)
    , work_(boost::asio::make_work_guard(executor))
    , strand_(boost::asio::make_strand(executor))
    , remote_address_(remote_address)
    , journal_(journal)
{
    read_buf_.commit(boost::asio::buffer_copy(
        read_buf_.prepare(boost::asio::buffer_size(buffers)), buffers));
    static std::atomic<int> sid;
    nid_ = ++sid;
    id_ = std::string("#") + std::to_string(nid_) + " ";
    JLOG(journal_.trace()) << id_ << "accept:    " << remote_address_.address();
}

template <class Handler, class Impl>
BaseHTTPPeer<Handler, Impl>::~BaseHTTPPeer()
{
    handler_.onClose(session(), ec_);
    JLOG(journal_.trace()) << id_ << "destroyed: " << request_count_
                           << ((request_count_ == 1) ? " request"
                                                     : " requests");
}

template <class Handler, class Impl>
void
BaseHTTPPeer<Handler, Impl>::close()
{
    if (!strand_.running_in_this_thread())
        return post(
            strand_,
            std::bind(
                (void(BaseHTTPPeer::*)(void)) & BaseHTTPPeer::close,
                impl().shared_from_this()));
    boost::beast::get_lowest_layer(impl().stream_).close();
}

//------------------------------------------------------------------------------

template <class Handler, class Impl>
void
BaseHTTPPeer<Handler, Impl>::fail(error_code ec, char const* what)
{
    if (!ec_ && ec != boost::asio::error::operation_aborted)
    {
        ec_ = ec;
        JLOG(journal_.trace())
            << id_ << std::string(what) << ": " << ec.message();
        boost::beast::get_lowest_layer(impl().stream_).close();
    }
}

template <class Handler, class Impl>
void
BaseHTTPPeer<Handler, Impl>::start_timer()
{
    boost::beast::get_lowest_layer(impl().stream_)
        .expires_after(std::chrono::seconds(
            remote_address_.address().is_loopback() ? timeoutSecondsLocal
                                                    : timeoutSeconds));
}

// Convenience for discarding the error code
template <class Handler, class Impl>
void
BaseHTTPPeer<Handler, Impl>::cancel_timer()
{
    boost::beast::get_lowest_layer(impl().stream_).expires_never();
}

// Called when session times out
template <class Handler, class Impl>
void
BaseHTTPPeer<Handler, Impl>::on_timer()
{
    auto ec =
        boost::system::errc::make_error_code(boost::system::errc::timed_out);
    fail(ec, "timer");
}

//------------------------------------------------------------------------------

template <class Handler, class Impl>
void
BaseHTTPPeer<Handler, Impl>::do_read(yield_context do_yield)
{
    complete_ = false;
    error_code ec;
    start_timer();
    boost::beast::http::async_read(
        impl().stream_, read_buf_, message_, do_yield[ec]);
    cancel_timer();
    if (ec == boost::beast::http::error::end_of_stream)
        return do_close();
    if (ec == boost::beast::error::timeout)
        return on_timer();
    if (ec)
        return fail(ec, "http::read");
    do_request();
}

// Send everything in the write queue.
// The write queue must not be empty upon entry.
template <class Handler, class Impl>
void
BaseHTTPPeer<Handler, Impl>::on_write(
    error_code const& ec,
    std::size_t bytes_transferred)
{
    cancel_timer();
    if (ec == boost::beast::error::timeout)
        return on_timer();
    if (ec)
        return fail(ec, "write");
    bytes_out_ += bytes_transferred;
    {
        std::lock_guard lock(mutex_);
        wq2_.clear();
        wq2_.reserve(wq_.size());
        std::swap(wq2_, wq_);
    }
    if (!wq2_.empty())
    {
        std::vector<boost::asio::const_buffer> v;
        v.reserve(wq2_.size());
        for (auto const& b : wq2_)
            v.emplace_back(b.data.get(), b.bytes);
        start_timer();
        return boost::asio::async_write(
            impl().stream_,
            v,
            bind_executor(
                strand_,
                std::bind(
                    &BaseHTTPPeer::on_write,
                    impl().shared_from_this(),
                    std::placeholders::_1,
                    std::placeholders::_2)));
    }
    if (!complete_)
        return;
    if (graceful_)
        return do_close();
    util::spawn(
        strand_,
        std::bind(
            &BaseHTTPPeer<Handler, Impl>::do_read,
            impl().shared_from_this(),
            std::placeholders::_1));
}

template <class Handler, class Impl>
void
BaseHTTPPeer<Handler, Impl>::do_writer(
    std::shared_ptr<Writer> const& writer,
    bool keep_alive,
    yield_context do_yield)
{
    std::function<void(void)> resume;
    {
        auto const p = impl().shared_from_this();
        resume = std::function<void(void)>([this, p, writer, keep_alive]() {
            util::spawn(
                strand_,
                std::bind(
                    &BaseHTTPPeer<Handler, Impl>::do_writer,
                    p,
                    writer,
                    keep_alive,
                    std::placeholders::_1));
        });
    }

    for (;;)
    {
        if (!writer->prepare(bufferSize, resume))
            return;
        error_code ec;
        auto const bytes_transferred = boost::asio::async_write(
            impl().stream_,
            writer->data(),
            boost::asio::transfer_at_least(1),
            do_yield[ec]);
        if (ec)
            return fail(ec, "writer");
        writer->consume(bytes_transferred);
        if (writer->complete())
            break;
    }

    if (!keep_alive)
        return do_close();

    util::spawn(
        strand_,
        std::bind(
            &BaseHTTPPeer<Handler, Impl>::do_read,
            impl().shared_from_this(),
            std::placeholders::_1));
}

//------------------------------------------------------------------------------

// Send a copy of the data.
template <class Handler, class Impl>
void
BaseHTTPPeer<Handler, Impl>::write(void const* buf, std::size_t bytes)
{
    if (bytes == 0)
        return;
    if ([&] {
            std::lock_guard lock(mutex_);
            wq_.emplace_back(buf, bytes);
            return wq_.size() == 1 && wq2_.size() == 0;
        }())
    {
        if (!strand_.running_in_this_thread())
            return post(
                strand_,
                std::bind(
                    &BaseHTTPPeer::on_write,
                    impl().shared_from_this(),
                    error_code{},
                    0));
        else
            return on_write(error_code{}, 0);
    }
}

template <class Handler, class Impl>
void
BaseHTTPPeer<Handler, Impl>::write(
    std::shared_ptr<Writer> const& writer,
    bool keep_alive)
{
    util::spawn(
        strand_,
        std::bind(
            &BaseHTTPPeer<Handler, Impl>::do_writer,
            impl().shared_from_this(),
            writer,
            keep_alive,
            std::placeholders::_1));
}

// DEPRECATED
// Make the Session asynchronous
template <class Handler, class Impl>
std::shared_ptr<Session>
BaseHTTPPeer<Handler, Impl>::detach()
{
    return impl().shared_from_this();
}

// DEPRECATED
// Called to indicate the response has been written(but not sent)
template <class Handler, class Impl>
void
BaseHTTPPeer<Handler, Impl>::complete()
{
    if (!strand_.running_in_this_thread())
        return post(
            strand_,
            std::bind(
                &BaseHTTPPeer<Handler, Impl>::complete,
                impl().shared_from_this()));

    message_ = {};
    complete_ = true;

    {
        std::lock_guard lock(mutex_);
        if (!wq_.empty() && !wq2_.empty())
            return;
    }

    // keep-alive
    util::spawn(
        strand_,
        std::bind(
            &BaseHTTPPeer<Handler, Impl>::do_read,
            impl().shared_from_this(),
            std::placeholders::_1));
}

// DEPRECATED
// Called from the Handler to close the session.
template <class Handler, class Impl>
void
BaseHTTPPeer<Handler, Impl>::close(bool graceful)
{
    if (!strand_.running_in_this_thread())
        return post(
            strand_,
            std::bind(
                (void(BaseHTTPPeer::*)(bool)) &
                    BaseHTTPPeer<Handler, Impl>::close,
                impl().shared_from_this(),
                graceful));

    complete_ = true;
    if (graceful)
    {
        graceful_ = true;
        {
            std::lock_guard lock(mutex_);
            if (!wq_.empty() || !wq2_.empty())
                return;
        }
        return do_close();
    }

    boost::beast::get_lowest_layer(impl().stream_).close();
}

}  // namespace ripple

#endif
